package com.example.mqtt.config;

import cn.hutool.json.JSONUtil;
import com.example.mqtt.MyGateway;
import com.example.mqtt.dataSource.BaseSource;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.concurrent.TimeUnit;

@Configuration
public class MqttOutBound {
    @Autowired
    BaseSource baseSource;
    @Autowired
    MyGateway gateway;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{"tcp://localhost:1883"});
        factory.setConnectionOptions(options);
        return factory;
    }


    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("outClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public IntegrationFlow sourceFlow() {
        return IntegrationFlows.
                from(baseSource, e -> e.poller(Pollers.fixedDelay(1, TimeUnit.MILLISECONDS))).
                transform(JSONUtil::toJsonStr).
                handle(message -> gateway.sendToMqtt("topic1/12", (String) message.getPayload())).
                get();
    }

}
